package com.atguigu.flink.chapter11.time;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author lzc
 * @Date 2022/7/12 11:12
 */
public class Flink02_Time_Event_2 {
    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
    
    
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        // 设置默认时区
//        tEnv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(0));
        // bigint          timestamp(3)
        tEnv.executeSql("create table sensor(" +
                            " id string, " +
                            " ts bigint, " +
                            " vc int, " +
                            " et as to_timestamp_ltz(ts, 3), " +
                            " watermark for et as et - interval '3' second" +
                            ")with(" +
                            " 'connector' = 'filesystem', " +
                            " 'path' = 'input/sensor.txt', " +
                            " 'format' = 'csv' " +
                            ")");
        
        tEnv.sqlQuery("select CONVERT_TZ(date_format(et, 'yyyy-MM-dd HH:mm:ss'), 'UTC', 'GMT-08:00') from sensor").execute().print();
        
    }
}
